# Data & Classifier Imports
import time
import random
import scipy as sp
import pandas as pd
import numpy as np
from numpy import ones, zeros
from scipy.sparse import vstack
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.svm import LinearSVC, SVC
from sklearn.metrics import confusion_matrix
from data import Data
from features import get_features
from features import plot_features
from classifier import fit_clf, evaluate
# Attacker imports
import tensorflow as tf
from tensorflow.data import Dataset
from tensorflow.sparse import SparseTensor
import keras
import keras.backend as K
from keras.layers import Input, Dense, Activation, Lambda
from keras.layers import ActivityRegularization, Dropout
from keras.layers.merge import Maximum, Concatenate, Multiply
from keras.models import Model, load_model
from keras.optimizers import Adam, Nadam, Adagrad
from keras.regularizers import l1, l2, l1_l2
from keras.losses import binary_crossentropy
from keras.utils.vis_utils import plot_model
from attack import EvadeGAN
from attack import binarise
from attack import output_progress, plot_TPR_metrics, plot_confusion_matrix
from IPython.core.debugger import set_trace
from IPython.display import display, HTML, Image
%matplotlib inline
%load_ext autoreload
Main directories
TOP_DIR = '../data/'
PLOT_DIR = TOP_DIR + 'plots/'
ADV_DIR = PLOT_DIR + 'adversarial/'
TPR_DIR = PLOT_DIR + 'TPR/'
GAN_DIR = TOP_DIR + 'GAN/'
Fix the random seeds for reproducibility
# Fix the random seeds for reproducibility
random.seed(0)
np.random.seed(0)
tf.random.set_seed(0)
print("\u0044\u006f\u006e\u0065\u0020\u2713")
Feature selection based on the $ \chi^2 $ test to pick the top 10K features that are most correlated with class labels (most discriminative between the two classes).
# Class labels, could change them with WGAN
MAL_LABEL = 1
GOOD_LABEL = 0
n_features = 10000
# Load data
%autoreload
data = Data(n_features=n_features)
# Split data
X_train, X_test, Y_train, Y_test = data.split(test_size=0.3, random_state=0)
# Adjust Class Labels [Goodware is 0 or -1]
Y_train[Y_train == 0] = GOOD_LABEL
Y_test[Y_test == 0] = GOOD_LABEL
Y_train[Y_train == 1] = MAL_LABEL
Y_train[Y_train == 1] = MAL_LABEL
# Malware & its split subsets
X_malware = data.get_X_malware()
X_mal_train = data.get_X_mal_train()
X_mal_test = data.get_X_mal_test()
# Goodware & its split subsets
X_goodware = data.get_X_goodware()
X_good_train = data.get_X_good_train()
X_good_test = data.get_X_good_test()
# Fit the classifier
model = fit_clf(X_train, Y_train, C=1.0, weight={1:2, 0:1}, linearsvc=True,
fname=f'c_1_top_{n_features}_test_0.3_weight_2_linearsvc')
# Get model parameters (either LinearSVC or SVC)
C = model.C
if type(model)==LinearSVC:
weights = model.coef_.flatten()
elif type(model)==SVC:
weights = model.coef_.toarray().flatten()
intercept = model.intercept_[0]
print("C =", C)
print("Weights =", np.round(weights,4))
print(f"Non-zero Weights % = "
f"{100*np.nonzero(weights)[0].shape[0]/weights.shape[0]:.2f}%")
print("Intercept =", intercept)
Feature weights (insight into the model complexity and interpretability)
%autoreload
plot_features(model)
print("TP (Malware) [True Positives, from each of the training & test sets]\n"
"====================================================================")
# From the training set
# From the training set
Y_pred = model.predict(X_mal_train)
TP_train = X_mal_train[np.where(Y_pred==MAL_LABEL)[0]]
print(f'All mal_train: {X_mal_train.shape[0]}')
print(f'TP_train: {TP_train.shape[0]} '
f'({100*TP_train.shape[0]/X_mal_train.shape[0]:.2f}%)\n')
# From the test set
Y_pred = model.predict(X_mal_test)
TP_test = X_mal_test[np.where(Y_pred==MAL_LABEL)[0]]
print(f'All mal_test: {X_mal_test.shape[0]}')
print(f'TP_test: {TP_test.shape[0]} '
f'({100*TP_test.shape[0]/X_mal_test.shape[0]:.2f}%)')
print("TN (Goodware) [True Negatives, from each of the training & test sets]\n"
"=====================================================================")
# From the training set
Y_pred = model.predict(X_good_train)
TN_train = X_good_train[np.where(Y_pred==GOOD_LABEL)[0]]
print(f'All good_train: {X_good_train.shape[0]}')
print(f'TN_train: {TN_train.shape[0]} '
f'({100*TN_train.shape[0]/X_good_train.shape[0]:.2f}%)\n')
# From the test set
Y_pred = model.predict(X_good_test)
TN_test = X_good_test[np.where(Y_pred==GOOD_LABEL)[0]]
print(f'All good_test: {X_good_test.shape[0]}')
print(f'TN_test: {TN_test.shape[0]} '
f'({100*TN_test.shape[0]/X_good_test.shape[0]:.2f}%)')
EvadeGAN shares the same training and test data as the target model, but only those correctly classified by the target model are considered.
# Will only consider TP Malware
X_mal_train = TP_train.todense()
X_mal_test = TP_test.todense()
# Will only consider TN Goodware
X_good_train = TN_train.todense()
X_good_test = TN_test.todense()
print("All Dense!")
################################################################################
# EvadeGAN Mode
################################################################################
g_input = 'z' # {'x', 'z', 'xz'}
################################################################################
# All dimensions
x_dim = n_features # 10000
z_dim = 100 # 32, 64, 128, 256, 512, 1024
n_hidden = 256
################################################################################
# Loss hyperparameters
targetEvasionRate = 1.0 # 100% (TPR=0.0)
alpha = 0.0015 # Weight for sparsity penalty (L1 regularisation)
beta = 500 # Weight for the upper bound penalty (hinge loss)
normalise_loss = True # Whether to scale loss to the range [0, 1]
max_changes = 15 # Upper bound for changes, relax to 20 for EvadeGANz
bound_reduce_func = 'mean' # function to reduce batch changes, 'mean' or 'max'
# 'max' is (more restrictive)
################################################################################
bin_threshold = 0.5 # Binarisation threshold
# Parameters for building the generator
g_params = {'n_hidden': n_hidden,
'h_activation': 'relu', 'batchnorm':False,
'regularizers': {'activity_regularizer':l1_l2(alpha, 1e-5)},
'out_activation': 'sigmoid',
'drop_rate': 0.5}
# Parameters for building the discriminator
d_params = {'n_hidden': n_hidden,
'h_activation': 'linear',
'h_constraint': None,
'out_activation': 'sigmoid'}
# Parameters for compiling the discriminator (learining parameters)
d_compile_params = {'loss': 'binary_crossentropy',
'optimizer': Nadam(lr=0.001, clipvalue=1.0),
'metrics': ['accuracy']}
gan_compile_params = {'loss': 'custom',
'beta':beta, 'normalise_loss':normalise_loss,
'bound_func':bound_reduce_func, 'target_label':GOOD_LABEL,
'max_changes':max_changes,
'optimizer': Nadam(lr=0.0005, clipvalue=1.0),
'metrics': ['accuracy']}
# Sample-Independent Pertubations Z
evadeGAN = EvadeGAN(target_model=model, x_dim=x_dim, z_dim=z_dim,
g_input=g_input,
bin_threshold = bin_threshold,
g_params=g_params, d_params=d_params,
d_compile_params=d_compile_params,
gan_compile_params=gan_compile_params)
%autoreload
plot_model(evadeGAN.generator, to_file="evadeGAN_gen.png", show_shapes=True,
show_layer_names=True, rankdir="TB", expand_nested=True, dpi=96)
display(HTML("<h3>EvadeGAN Generator</h3><br />"))
Image("evadeGAN_gen.png")
plot_model(evadeGAN.discriminator, to_file="evadeGAN_dis.png", show_shapes=True,
show_layer_names=True, rankdir="TB", expand_nested=True, dpi=96)
display(HTML("<h3>EvadeGAN Discriminator</h3><br />"))
Image("evadeGAN_dis.png")
Training for 100 epochs. Best performance achieved after 58 epochs, with evasion rate = 100% and avg 13.8 changes per sample.
%autoreload
epochs=100
batch_size = 32
# batch_size = TP_train.shape[0] # whole dataset
TPR_train, TPR_test, avg_diff_train, avg_diff_test, d_metrics, gan_metrics, best_G_path = \
evadeGAN.train(target_model=model, epochs=epochs, batch_size=batch_size,
n_progress=10, d_times=1, d_train_mal=True, d_train_adv=True,
gan_times=1, good_label=GOOD_LABEL, mal_label=MAL_LABEL,
X_mal_train=X_mal_train, X_mal_test=X_mal_test,
good_batch_factor=1, X_good_train=X_good_train,
X_good_test=X_good_test, max_changes=max_changes,
minTPR_threshold=1-targetEvasionRate,
gan_dir=GAN_DIR, smooth_alpha=1.0)
%autoreload
%autoreload
plot_TPR_metrics(TPR_train, TPR_test, avg_diff_train, avg_diff_test,
d_metrics, gan_metrics)
The discriminator is evaluated on all the samples that are correctly classified by the target model (True Positives & True Negatives).
%autoreload
# Evaluate discriminatior (how well does it approximate the target model)
Y_true_train = np.concatenate((MAL_LABEL * ones(X_mal_train.shape[0]),
GOOD_LABEL * ones(X_good_train.shape[0])))
Y_pred_train = \
np.concatenate(
(binarise(evadeGAN.discriminator.predict(X_mal_train), bin_threshold),
binarise(evadeGAN.discriminator.predict(X_good_train), bin_threshold))
)
Y_true_test = np.concatenate((MAL_LABEL * ones(X_mal_test.shape[0]),
GOOD_LABEL * ones(X_good_test.shape[0])))
Y_pred_test = \
np.concatenate(
(binarise(evadeGAN.discriminator.predict(X_mal_test), bin_threshold),
binarise(evadeGAN.discriminator.predict(X_good_test), bin_threshold))
)
plot_confusion_matrix(Y_true_train, Y_pred_train, Y_true_test, Y_pred_test,
title="Confusion Matrix of the Discriminator")
generator = load_model(best_G_path)
%autoreload
pd.options.display.max_rows = 1000
A. Many X's and many Z's
# Matrix to array
x_good_train = np.asarray(X_good_train) # To analyse perturbations
x_mal_train = np.asarray(X_mal_train) # To analyse perturbations
x_mal_test = np.asarray(X_mal_test) # To generate AEs
z = np.random.uniform(0.0, 1.0, size=[X_mal_test.shape[0], z_dim])
x_adv = generator.predict([X_mal_test, z])
x_adv = binarise(x_adv).numpy() # Binary AE
perturbs = x_adv - x_mal_test # Delta = x' - x
B. One X and many Z's (does changing z produce diverse AEs?)
N = 100
x_one = np.repeat(X_mal_test[0], N, axis=0)
z_one = np.random.uniform(0.0, 1.0, size=[N, z_dim])
x_adv_one = generator.predict([x_one, z_one])
x_adv_one = binarise(x_adv_one).numpy() # Binary AE
perturbs_one = x_adv_one - x_one # Delta = x' - x
A. Many X's and many Z's
y_adv = model.predict(x_adv)
print(f'{np.count_nonzero(y_adv)} AEs classified as malware.') # Evasive AEs?
B. One X and many Z's (does changing z produce diverse AEs?)
y_adv_one = model.predict(x_adv_one)
print(f'{np.count_nonzero(y_adv_one)} AEs classified as malware.') # Evasive AEs?
A. Many X's and many Z's
df_perturbs = pd.DataFrame(perturbs)
unique_perturbs = df_perturbs.drop_duplicates()
print(f"{'Numer of AEs:':<20} {X_mal_test.shape[0]}")
print(f"{'Unique Pertubations:':<20} {unique_perturbs.shape[0]} "
f"({100*unique_perturbs.shape[0]/X_mal_test.shape[0]:.2f}%)")
df_perturbs.astype(bool).sum(axis=1).describe()
B. One X and many Z's (does changing z produce diverse AEs?)
df_perturbs_one = pd.DataFrame(perturbs_one)
unique_perturbs_one = df_perturbs_one.drop_duplicates()
print(f"{'Numer of AEs:':<20} {x_one.shape[0]}")
print(f"{'Unique Pertubations:':<20} {unique_perturbs_one.shape[0]} "
f"({100*unique_perturbs_one.shape[0]/x_one.shape[0]:.2f}%)")
df_perturbs_one.astype(bool).sum(axis=1).describe()
feature_names = data.get_feature_names()
feature_weights = weights
perturb_count = pd.DataFrame(perturbs).astype(bool).sum(axis=0) # Feature count in perturbations
mal_test_count = pd.DataFrame(X_mal_test).astype(bool).sum(axis=0) # Feature count in testing malware
mal_train_count = pd.DataFrame(X_mal_train).astype(bool).sum(axis=0) # Feature count in training malware
good_train_count = pd.DataFrame(X_good_train).astype(bool).sum(axis=0) # Feature count in training goodware
df = pd.DataFrame({'FeatureName': feature_names,
'FeatureWeight': feature_weights,
'malTestCount': mal_test_count,
'malTrainCount': mal_train_count,
'malTrainFreq': mal_train_count / X_mal_train.shape[0],
'goodTrainCount': good_train_count,
'goodTrainFreq': good_train_count / X_good_train.shape[0],
'PerturbCount': perturb_count})
Check their weights and frequencies in malware (training and test sets) and goodware.
df.sort_values(['PerturbCount'], ascending=False).round(5).head(100) # Top 100
df.sort_values(['FeatureWeight'], ascending=False).round(5).head(100).to_csv('features_top_pos.csv', sep='\t')
df.sort_values(['FeatureWeight'], ascending=True).round(5).head(100)